package com.hm.base.dispersed.calculate.su;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;

import com.hm.base.dispersed.calculate.conf.DispersedCalculate;
import com.hm.base.dispersed.calculate.conf.DispersedCalculateJobRunTagStatusEnum;
import com.hm.base.dispersed.calculate.conf.DispersedCalculateTaskRunStatusEnum;
import com.hm.common.exception.ServiceException;
import com.hm.common.redis.jedis.JedisFactory;
import com.hm.common.redis.jedis.su.JedisHandler;
import com.hm.common.redis.jedis.su.JedisHandlerSupport;
import com.hm.common.util.CommonUtil;

/**
 * @author shishun.wang
 * @date 上午9:51:43 2017年9月18日
 * @version 1.0
 * @describe
 */
@Component
public class DispersedCalculateSupport {

	private int jobRunTagLifeTime = 60 * 30;

	@Autowired
	private JedisHandlerSupport jedisHandler;

	@Autowired
	private TaskExecutor taskExecutor;

	public void execute(String businessTaskName, DispersedCalculateHandler calculateHandler) {
		this.execute(businessTaskName, this.jobRunTagLifeTime, calculateHandler);
	}

	public void execute(String businessTaskName, int jobRunTagLifeTime, DispersedCalculateHandler calculateHandler) {
		Object jobRunTag = initialization(businessTaskName, jobRunTagLifeTime);
		taskExecutor.execute(new Runnable() {

			@Override
			public void run() {
				try {
					calculateHandler.handler((Long) jobRunTag);
					recordJobRunTagStatus(businessTaskName, jobRunTag,
							DispersedCalculateJobRunTagStatusEnum.SUCCESSFUL);
				} catch (Exception e) {
					// 工作job数据处理失败
					recordJobRunTagStatus(businessTaskName, jobRunTag, DispersedCalculateJobRunTagStatusEnum.FAILURE);
				}

			}
		});
	}

	private Object initialization(String businessTaskName, int jobRunTagLifeTime) {
		// final String businessTaskName = "MY_BUSINESS";

		final Object jobRunTag = loadJobRunTag(businessTaskName, jobRunTagLifeTime);
		if (CommonUtil.isEmpty(jobRunTag)) {
			throw ServiceException.warn("当前分布式计算任务没找到对应的,工作JOB标识");
		}

		// 工作job数据处理中
		recordJobRunTagStatus(businessTaskName, jobRunTag, DispersedCalculateJobRunTagStatusEnum.PROCESSING);

		return jobRunTag;
	}

	private void recordJobRunTagStatus(String businessTaskName, final Object jobRunTag,
			DispersedCalculateJobRunTagStatusEnum jobRunTagStatus) {
		jedisHandler.executeCommand(new JedisHandler() {

			@Override
			public Object handler(JedisFactory jedis) {
				jedis.set(DispersedCalculate.JOB_RUN_TAG_STATUS + ":" + businessTaskName + ":" + jobRunTag,
						jobRunTagStatus.status());
				return null;
			}
		});
	}

	private Object loadJobRunTag(String businessTaskName, int jobRunTagLifeTime) {
		final Object jobRunTag = jedisHandler.executeCommand(new JedisHandler() {

			@Override
			public Object handler(JedisFactory jedis) {
				String taskRunStatusKey = DispersedCalculate.TASK_RUN_STATUS + ":" + businessTaskName;
				String jobRunTagKey = DispersedCalculate.JOB_RUN_TAG + ":" + businessTaskName;

				String status = jedis.get(taskRunStatusKey);
				if (CommonUtil.isEmpty(status)) {
					jedis.set(taskRunStatusKey, DispersedCalculateTaskRunStatusEnum.RUNNING.status());
					return jedis.incr(jobRunTagKey, jobRunTagLifeTime);
				}

				DispersedCalculateTaskRunStatusEnum statusEnum = DispersedCalculateTaskRunStatusEnum.trance(status);
				if (statusEnum == DispersedCalculateTaskRunStatusEnum.FINISHED) {
					jedis.set(taskRunStatusKey, DispersedCalculateTaskRunStatusEnum.RUNNING.status());
					jedis.remove(jobRunTagKey);
					return jedis.incr(jobRunTagKey, jobRunTagLifeTime);
				}

				if (statusEnum == DispersedCalculateTaskRunStatusEnum.RUNNING) {
					return jedis.incr(jobRunTagKey, jobRunTagLifeTime);
				}

				throw ServiceException.warn("当前分布式计算任务状态不是已完成状态,不能执行任务");
			}
		});
		return jobRunTag;
	}

}
